package com.xiaojie.config;

import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.transaction.KafkaTransactionManager;

import java.util.HashMap;
import java.util.Map;

/**
 * @Description: Kafka配置类
 * @author: xiaojie
 * @date: 2021.10.14
 */
@Configuration
public class KafkaConfig {

    /*
     * @初始化topic
     * 参数解释，第一个参数 topic名称
     * 第二个参数 ，分区个数
     * 第三个参数，备份个数
     * @author xiaojie
     * @date 2021/10/14 13:50
     * @return org.apache.kafka.clients.admin.NewTopic
     */
    @Bean
    public NewTopic newTopic() {
        return new NewTopic("my-topic", 6, (short) 3);
    }
    @Bean
    public NewTopic myTopic() {
        return new NewTopic("my-topic-partition", 6, (short) 3);
    }

    @Bean
    public NewTopic testTopic() {
        return new NewTopic("test-topic", 6, (short)2);
    }
    @Bean
    public NewTopic xiaojieTopic() {
        return new NewTopic("xiaojie-test-topic", 6, (short)3);
    }
    /*
        监听器负责确认-使用AcknowlingMessageListener；当侦听器已处理上一次轮询返回的所有记录时，ACK将排队并提交偏移量。
     */
    @Bean("manualListenerContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> manualListenerContainerFactory(
            ConsumerFactory<String, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties().setPollTimeout(1500);
        factory.setBatchListener(true); //设置批量为true，那么消费端就要一批量的形式接收信息
        //配置手动提交offset
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        return factory;
    }

    /*
        如果在调用使用者线程上确认，则将立即执行提交,否则与MANUAL相同
     */
    @Bean("manualImmediateListenerContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> manualImmediateListenerContainerFactory(
            ConsumerFactory<String, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties().setPollTimeout(1500);
        factory.setBatchListener(true);
        //配置手动提交offset
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }
/*
    RECORD,当每一条记录被消费者监听器（ListenerConsumer）处理之后提交
    BATCH,当每一批记录被消费者监听器（ListenerConsumer）处理之后提交
    TIME, 每隔多长时间提交，超过该时间会自动提交
    COUNT, 每次提交的数量，超过该数量自动提交
    COUNT_TIME, 满足时间和数量的任何一个条件提交
    MANUAL_IMMEDIATE
    MANUAL
 */

    /**
     * @description:消息过滤器，过滤掉消息内容不包含有hello的内容。
     * @author xiaojie
     * @date 2021/10/14 22:09
     * @version 1.0
     */
    @Bean("filterFactory")
    public ConcurrentKafkaListenerContainerFactory filterFactory(ConsumerFactory<String, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory);
        factory.setAckDiscarded(true);
        factory.setRecordFilterStrategy(consumerRecord -> {
            String value = (String) consumerRecord.value();
            if (value.contains("hello")) {
                //返回false消息没有被过滤继续消费
                return false;
            }
            System.out.println("....................");
            //返回true 消息被过滤掉了
            return true;
        });
        return factory;
    }
}
